// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.es;

import org.apache.doris.analysis.SinglePartitionDesc;
import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.thrift.TNetworkAddress;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;

import java.security.SecureRandom;
import java.util.List;
import java.util.Map;

public class EsShardPartitions {

    private static final Logger LOG = LogManager.getLogger(EsShardPartitions.class);

    private final String indexName;
    // shardid -> host1, host2, host3
    private Map<Integer, List<EsShardRouting>> shardRoutings;
    private SinglePartitionDesc partitionDesc;
    private PartitionKey partitionKey;
    private long partitionId = -1;

    public EsShardPartitions(String indexName) {
        this.indexName = indexName;
        this.shardRoutings = Maps.newHashMap();
        this.partitionDesc = null;
        this.partitionKey = null;
    }

    /**
     * Parse shardRoutings from the json
     *
     * @param indexName indexName(alias or really name)
     * @param searchShards the return value of _search_shards
     * @return shardRoutings is used for searching
     */
    public static EsShardPartitions findShardPartitions(String indexName, String searchShards) throws DorisEsException {
        EsShardPartitions partitions = new EsShardPartitions(indexName);
        JSONObject jsonObject = (JSONObject) JSONValue.parse(searchShards);
        JSONArray shards = (JSONArray) jsonObject.get("shards");
        int size = shards.size();
        for (int i = 0; i < size; i++) {
            List<EsShardRouting> singleShardRouting = Lists.newArrayList();
            JSONArray shardsArray = (JSONArray) shards.get(i);
            for (Object o : shardsArray) {
                JSONObject indexShard = (JSONObject) o;
                String shardState = (String) indexShard.get("state");
                if ("STARTED".equalsIgnoreCase(shardState) || "RELOCATING".equalsIgnoreCase(shardState)) {
                    try {
                        singleShardRouting.add(new EsShardRouting((String) indexShard.get("index"),
                                ((Long) indexShard.get("shard")).intValue(), (Boolean) indexShard.get("primary"),
                                (String) indexShard.get("node")));
                    } catch (Exception e) {
                        LOG.error("fetch index [{}] shard partitions failure", indexName, e);
                        throw new DorisEsException(
                                "fetch [" + indexName + "] shard partitions failure [" + e.getMessage() + "]");
                    }
                }
            }
            if (singleShardRouting.isEmpty()) {
                LOG.error("could not find a healthy allocation for [{}][{}]", indexName, i);
                continue;
            }
            partitions.addShardRouting(i, singleShardRouting);
        }
        return partitions;
    }

    public void addHttpAddress(Map<String, EsNodeInfo> nodesInfo) {
        for (Map.Entry<Integer, List<EsShardRouting>> entry : shardRoutings.entrySet()) {
            List<EsShardRouting> shardRoutings = entry.getValue();
            for (EsShardRouting shardRouting : shardRoutings) {
                String nodeId = shardRouting.getNodeId();
                if (nodesInfo.containsKey(nodeId)) {
                    shardRouting.setHttpAddress(nodesInfo.get(nodeId).getPublishAddress());
                } else {
                    shardRouting.setHttpAddress(randomAddress(nodesInfo));
                }
            }
        }
    }

    public TNetworkAddress randomAddress(Map<String, EsNodeInfo> nodesInfo) {
        // return a random value between 0 and 32767 : [0, 32767)
        int seed = new SecureRandom().nextInt(Short.MAX_VALUE) % nodesInfo.size();
        EsNodeInfo[] nodeInfos = nodesInfo.values().toArray(new EsNodeInfo[0]);
        return nodeInfos[seed].getPublishAddress();
    }

    public void addShardRouting(int shardId, List<EsShardRouting> singleShardRouting) {
        shardRoutings.put(shardId, singleShardRouting);
    }

    public String getIndexName() {
        return indexName;
    }

    public Map<Integer, List<EsShardRouting>> getShardRoutings() {
        return shardRoutings;
    }

    public SinglePartitionDesc getPartitionDesc() {
        return partitionDesc;
    }

    public void setPartitionDesc(SinglePartitionDesc partitionDesc) {
        this.partitionDesc = partitionDesc;
    }

    public PartitionKey getPartitionKey() {
        return partitionKey;
    }

    public void setPartitionKey(PartitionKey partitionKey) {
        this.partitionKey = partitionKey;
    }

    public long getPartitionId() {
        return partitionId;
    }

    public void setPartitionId(long partitionId) {
        this.partitionId = partitionId;
    }

    @Override
    public String toString() {
        return "EsIndexState [indexName=" + indexName + ", partitionDesc=" + partitionDesc + ", partitionKey="
                + partitionKey + "]";
    }
}
